- 
                Notifications
    
You must be signed in to change notification settings  - Fork 13.8k
 
[FLINK-38336][state/forst] Avoid data copy during failover for ForSt statebackend #27042
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
9c4c31e    to
    a8460bc      
    Compare
  
    | .getSharedStateDirectory(); | ||
| FsCheckpointStorageAccess fsCheckpointStorageAccess = | ||
| (FsCheckpointStorageAccess) env.getCheckpointStorageAccess(); | ||
| remoteJobPath = fsCheckpointStorageAccess.getCheckpointsDirectory(); | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can only share state from the shared directory, so what exactly the remoteJobPath means?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remoteJobPath is the parent directory that ends with 'JobID'.
For example:
remoteShareWithCheckpoint==True: If the ForSt base DIR is/checkpoints/jobid-xxx/shared/op_yyy__1_1__attempt_0, thenremoteJobPathis/checkpoints/jobid-xxx.
        
          
                ...rst/src/main/java/org/apache/flink/state/forst/datatransfer/DataTransferStrategyBuilder.java
          
            Show resolved
            Hide resolved
        
      | @Nullable Path remoteBasePath) { | ||
| this.localJobPath = localJobPath; | ||
| this.localBasePath = localBasePath; | ||
| this.localForStPath = localBasePath != null ? new Path(localBasePath, DB_DIR_STRING) : null; | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we have localJobPath, localBasePath and localForStPath which is derived from localBasePath.
I was expecting localJobpath to be a subfolder of localBasePath, is this true? Is this the bean to validate that?
I see localBasePath can be null, in this case localForStPath is set to null, but localJobPath can have a value. What does this mean? The code indicates that it is possible to run without Forst but have a . Have I understood this correctly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, localJobpath is a subfolder of localBasePath.
basePath,jobPath are allowed to be null, just to stay consistent with the code before this PR. Currently the local paths can only be null in UT tests.
| 
               | 
          ||
| import javax.annotation.Nullable; | ||
| 
               | 
          ||
| /** Container for ForSt paths. */ | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it would be worth describing each path here. The text talks of Forst paths , but there are only 2 of the 6 that mention Forst - localForStPath and remoteForStPath
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comments have been added to the code.
        
          
                .../flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStPathContainer.java
              
                Outdated
          
            Show resolved
            Hide resolved
        
              
          
                .../flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStPathContainer.java
          
            Show resolved
            Hide resolved
        
      | if (pathContainer.getRemoteForStPath() != null | ||
| && pathContainer.getLocalForStPath() != null) { | ||
| if (cacheBasePath == null && pathContainer.getLocalBasePath() != null) { | ||
| cacheBasePath = new Path(pathContainer.getLocalBasePath().getPath(), "cache"); | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I assume thr log is incorrect, not should be now 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The log is correct. It means the cache path is NOT specified in the configurations, so we set it to local base path.
        
          
                ...nk-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStResourceContainer.java
              
                Outdated
          
            Show resolved
            Hide resolved
        
      | public void forceClearRemoteDirectories() throws Exception { | ||
| if (remoteBasePath != null && remotePathNewlyCreated) { | ||
| clearDirectories(remoteBasePath); | ||
| if (pathContainer.getRemoteBasePath() != null && remotePathNewlyCreated) { | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The forceClear does not seem to force anything, in fact it will only clear the directories in a specific case when remotePathNewlyCreated is set.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that is to fix FLINK-38433
| optionsContainer.getLocalBasePath(), | ||
| optionsContainer.getRemoteBasePath(), | ||
| ex); | ||
| LOG.warn("Could not delete ForSt: {}.", optionsContainer.getPathContainer(), ex); | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: it seems we lose information here. I read the log entry text before  Could not delete ForSt local working directory {} to means it is only the local directory that could not be deleted and we put out the remote directory for information. It would be good to point to the actual folder that could not be deleted in the new log message - rather than list all the folders and not know which could not be deleted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, 'the actual folder that could not be deleted' should be included in the ex, so i think there is no info lost here.
        
          
                ...ends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStExtension.java
              
                Outdated
          
            Show resolved
            Hide resolved
        
      | RecoveryClaimMode.CLAIM, | ||
| CopyDataTransferStrategy.class); | ||
| 
               | 
          ||
| testRestoreStrategyAsExpected( | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
parameterized test for all these permutations?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah i've had a thought of that. However, these test cases have already been simple enough, and parameterizing them would not reduce the amount of code, but only make them harder to read.
| @VisibleForTesting | ||
| Path getLocalBasePath() { | ||
| return optionsContainer.getLocalBasePath(); | ||
| return optionsContainer.getPathContainer().getLocalBasePath(); | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is optionsContainer.getPathContainer() ever null?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No. pathContainer is final and it should never be null
193c2af    to
    9a2c99a      
    Compare
  
    There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Thanks for the fix
…statebackend (apache#27042) (cherry picked from commit 78f6e77)
…statebackend (apache#27042) (cherry picked from commit 78f6e77)
What is the purpose of the change
This PR fixes FLINK-38336, by allowing ForSt statebackend to reuse the restored files in failover scenario.
Brief change log
ForStPathContainerReusableDataTransferStrategyif we are in a failover scenarioVerifying this change
This change added tests and can be verified as follows:
DataTransferStrategyTest#testBuildingStrategyAsExpectedDoes this pull request potentially affect one of the following parts:
@Public(Evolving): (no)Documentation